package com.xuecheng.service;

import com.xuecheng.model.po.MqMessage;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 消息处理抽象类
 */
@Slf4j
@Data
public abstract class MessageProcessAbstract {
	
	@Resource
	private MqMessageService mqMessageService;
	
	
	/**
	 * 任务处理
	 */
	public abstract boolean execute(MqMessage mqMessage);
	
	
	/**
	 * 扫描消息表多线程执行任务
	 */
	public void process(int shardIndex, int shardTotal, String messageType, int count, long timeout) {
		
		try {
			//扫描消息表获取任务清单
			List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal, messageType, count);
			//任务个数
			int size = messageList.size();
			log.debug("取出待处理消息" + size + "条");
			if (size == 0) {
				return;
			}
			
			//创建线程池
			ExecutorService threadPool = Executors.newFixedThreadPool(size);
			//计数器
			CountDownLatch countDownLatch = new CountDownLatch(size);
			messageList.forEach(message -> {
				threadPool.execute(() -> {
					log.info("开始任务:{}", message);
					//处理任务
					try {
						boolean result = execute(message);
						if (result) {
							log.info("任务执行成功:{})", message);
							//更新任务状态,删除消息表记录,添加到历史表
							int completed = mqMessageService.completed(message.getId());
							if (completed > 0) {
								log.info("任务执行成功:{}", message);
							} else {
								log.error("任务执行失败:{}", message);
							}
						}
					} catch (Exception e) {
						log.error("任务出现异常:{},任务:{}", e.getMessage(), message);
					}
					//计数
					countDownLatch.countDown();
					log.info("结束任务:{}", message);
				});
			});
			
			//等待,给一个充裕的超时时间,防止无限等待，到达超时时间还没有处理完成则结束任务
			countDownLatch.await(timeout, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
			log.error("任务执行异常:{}", e.getMessage());
		}
		
		
	}
	
	
}
